import pandas as pd
import numpy as np
import cv2
import os
# plotly
import plotly
import plotly.graph_objects as go
from plotly.graph_objs import Scatter, Marker, Layout, XAxis, YAxis, Bar, Line
plotly.offline.init_notebook_mode()
# matplotlib
%matplotlib inline
from matplotlib import pyplot as plt
Dataset - https://www.kaggle.com/puneet6060/intel-image-classification/
In this lab, we will be constructing a multi-layer perceptron that will perform image classification on a dataset created by Intel. The data set was initially created by Intel for an image classification challenge hosted by Analytics Vidhya. The dataset consists of 25,000 images that were each 150x150 pixels in size and was split into ~14,000 training images, ~3,000 test images, and ~7,000 prediction/validation images. The train-test split was already done however since the 7,000 prediction/validation images had no associated class to them, the resulting dataset was closer to ~17,000 images. We decided to combine together the test and train images into one dataset that will later be split into train, test, and prediction/validation again. This will allow us to ensure we are using a stratified shuffle split for modeling. There were in total 6 classes of images and are as follows:
Classifying images by their context or biome is certainly an important and potentially useful form of machine learning. In our case, we are classifying images based on their general surrounding/biome into one of the 6 classes seen above. This could prove to be useful to some individuals or organizations but may not be much of use to larger 3rd parties that wish to use this commercially. Therefore, this lab serves more as a proof of concept that this could be expanded to other categories and deem much more useful to 3rd parties based on what they are trying to classify when it comes to background, surrounding, or context.
Our first use case for 3rd parties that could work with even these limited categories could be to identify stock photos by their category. For example, let's take a website like pexels.com which hosts thousands of stock images. In order to provide a reasonable means of browsing these photos, they would have to index the images manually (forcing the user to specify or hire workers to do so) or they could implement a classifier such as this one to automate the process. This could deem extremely useful to a company like pexels.com especially if they are receiving hundreds of images per day.
In relation to the above use case, another use case of this classifier could be to classify and index images to be used in a search engine platform. In the case of Google, Bing, or DuckDuckGo, these search engines provide the ability to search images too. These images obviously need to be tied to some sort of category or tag system, thus, our classification algorithm could be used to index images for search engines. This could prove to be very useful because not only will it allow the company to automate the process but it might classify some images better than they are currently classified, bringing better content to the user of said platform. Due to the vast amount of images that currently exist, there's bound to be some errors in the way they were classified due to human error. With adequate performance, this algorithm could outperform humans! If successful, this would help bring more users to a company’s platform and in turn raise their revenue.
Lastly, one use case could be for Apple’s or Android’s photos app. One or both of these companies could begin to (in the background) classify images in a user’s album so then the user could look at all images of a particular class. For example, with our proof of concept, images taken in rugged/mountainous terrain could be classified as mountain/nature photos then allowing users to browse. The same would essentially go for any other category too. Thus, the reason Apple or Android would be interested in using this algorithm would be to draw people to use certain devices/platforms which will, in turn, lead to higher revenues and a larger market control. This would be especially interesting to Apple, since they enjoy coercing users to adopt only their product. The same could be done for snapchat or any other company that stores/hosts user image data.
Throughout this lab, our main objective is to accurately classify an image into one of the 6 categories mentioned above in the introduction. A human should be able to classify these images into these categories and thus our neural network should be able to get pretty close at classifying these images too. Even though we used greyscale, given the categories above, none of the images are particularly dependent on a specific color to classify the image as that class. Applying this later to search engines, stock photo websites, or even apple photos applications could automate the process of trying to classify an image by its biome/surrounding and save companies money.
In order to entice companies such as Apple to use our image background classification model on their platforms, we need to be able to classify image backgrounds with not only a high degree of accuracy but to also minimize false positives and false negatives. This insures that a user is not seeing images that are not part of the search criteria given (less false positives) and that the user is seeing the images that they searched for (less false negatives). Using Macro F1-score we want our classifier to get as close to 1 as possible but given the limitations of our data size and the possible need of using color images rather than greyscale we may not reach our intended objective in this case study.
$$F1 = 2*\frac{Precision*Recall}{Precision+Recall}$$Below we load in our image data. As you can see we load in the train and the test data and then combine the two into one master numpy array. After loading, we have an images array that contains the image data, a labels_ext array which contains tuples of each image's filename and it's coressponding class, and a labels array which records ONLY what each image is classified as (index locations in the images array map to index locations in the labels array).
However, it should be noted that for our data set the pre-processing was already done for an Intel competition. The data was split into ~14k images in train, ~3k in test, and ~7k in prediction. We merged the two train and test as mentioned above to get a dataset size of ~17k images, all 150x150px. The predicition dataset was unable to be included as there were no associated classifications to it's data.
file_path_train = './data/images/seg_train'
file_path_test = './data/images/seg_test'
directory_dir_train = os.listdir(file_path_train)
directory_dir_test = os.listdir(file_path_test)
# Mac -- remove .DS_Store if its there.
try:
directory_dir_train.remove('.DS_Store')
directory_dir_test.remove('.DS_Store')
except:
print("Did not find and remove a .DS_Store in train/test")
#read in images
images_raw = []
labels_ext = []
error_count = 0
wrong_size = 0
# FOR TRAIN SET
for directory in directory_dir_train:
for img in os.listdir(file_path_train + '/' + directory):
image = cv2.imread(file_path_train + '/' + directory + '/' + img, cv2.IMREAD_GRAYSCALE)
try:
flat_img = image.flatten()
if(flat_img.size == 22500):
images_raw.append(flat_img)
labels_ext.append((directory,img))
else:
wrong_size += 1
except:
error_count += 1
# FOR TEST SET
for directory in directory_dir_test:
for img in os.listdir(file_path_test + '/' + directory):
image = cv2.imread(file_path_test + '/' + directory + '/' + img, cv2.IMREAD_GRAYSCALE)
try:
flat_img = image.flatten()
if(flat_img.size == 22500):
images_raw.append(flat_img)
labels_ext.append((directory,img))
else:
wrong_size += 1
except:
error_count += 1
# create our variables for images and labels
images = np.array(images_raw)
labels_ext = np.array(labels_ext)
labels = [i[0] for i in labels_ext]
# specify height and width
h, w = (150,150)
print("{} Images failed to resize".format(error_count))
print("{} Images were corrupt/wrong size".format(wrong_size))
print("Image size:", images[0].shape)
print("Image Count:", len(images))
# select random images to visualize
import random
random.seed(1)
# function to plot images in grid like fashion
def plot_gallery(images, title, h, w, n_row=3, n_col=6):
plt.figure(figsize=(1.7 * n_col, 2.3 * n_row))
plt.subplots_adjust(bottom=0, left=.01, right=.99, top=.90, hspace=.35)
rand_sample = random.sample(range(0,images.shape[0]), k=18)
for n,i in enumerate(rand_sample):
plt.subplot(n_row, n_col, n + 1)
plt.imshow(images[i].reshape((h,w)), cmap=plt.cm.gray)
plt.title(title[i], size=12)
plt.xticks(())
plt.yticks(())
Below is a sample plot of 18 images (in grayscale) from the dataset
# plot images
plot_gallery(images, labels, h, w)
Due to the vast size of our data set, we decided to sample a subset of each class. The data set contained between 2628-3037 instances per class as you can see depicted in the table and chart below. We decided that it was acceptable to take random sample of 1000 instances of each class to use for our dataset. This way we get an equalized number of instances per class which may help improve our training metric.
df_labels = pd.DataFrame(labels_ext, columns =['Class', 'Name'])
df_labels_count = df_labels['Class'].value_counts().reset_index()
df_labels_count.columns = ['Class', 'Count']
df_labels_count
fig = go.Figure([go.Bar(x=df_labels_count['Class'], y=df_labels_count['Count'])])
fig.update_layout(
title = 'Counts of Unique Classes In DataSet',
)
fig.show()
df_forest = df_labels.loc[df_labels['Class'] == 'forest']
df_mountain = df_labels.loc[df_labels['Class'] == 'mountain']
df_glacier = df_labels.loc[df_labels['Class'] == 'glacier']
df_street = df_labels.loc[df_labels['Class'] == 'street']
df_sea = df_labels.loc[df_labels['Class'] == 'sea']
df_buildings = df_labels.loc[df_labels['Class'] == 'buildings']
pd_labels_sample = pd.concat([
df_forest['Name'].sample(n=1000),
df_mountain['Name'].sample(n=1000),
df_glacier['Name'].sample(n=1000),
df_street['Name'].sample(n=1000),
df_sea['Name'].sample(n=1000),
df_buildings['Name'].sample(n=1000)
])
labels_sample = pd_labels_sample.values
labels_sample
Here we selected the 1000 image sample from each class
images
df_images_labels = pd.DataFrame(data=[
df_labels['Name'].values,
df_labels['Class'].values,
images
]).T
df_images_labels
images_sample = df_images_labels.loc[
df_images_labels[0].isin(labels_sample)
].reset_index(drop=True)
images_sample.columns = ['name', 'class', 'image']
images_sample
As you can see we have here our dataframe that contains 1000 instances per class for a total of 6000 instances.
# select random images to visualize
import random
random.seed(1)
# function to plot images in grid like fashion
def plot_gallery_df(images_df, title_col, image_col, h, w, n_row=3, n_col=6):
plt.figure(figsize=(1.7 * n_col, 2.3 * n_row))
plt.subplots_adjust(bottom=0, left=.01, right=.99, top=.90, hspace=.35)
rand_sample = images_df.sample(n = 18)
count = 0
for index, row in rand_sample.iterrows():
plt.subplot(n_row, n_col, count + 1)
count = count + 1
plt.imshow(row[image_col].reshape((h,w)), cmap=plt.cm.gray)
plt.title(row[title_col], size=12)
plt.xticks(())
plt.yticks(())
# plot images
plot_gallery_df(images_sample, 'class', 'image', 150, 150)
We will also use PCA to perform dimensionality reduction on our images. This will help speed up the neural network when training and wont damage our metric for performance by too much. As seen below, we were able to reduce the size of a flattened image from 22,500 to 4,000 and still have a 98.7% explained variance ratio.
from sklearn.decomposition import PCA
def plot_explained_variance(pca):
explained_var = pca.explained_variance_ratio_
cum_var_exp = np.cumsum(explained_var)
fig_pca = go.Figure([
go.Bar(y=explained_var, name='individual explained variance'),
go.Scatter(y=cum_var_exp, name='cumulative explained variance'),
])
fig_pca.update_layout(
title = 'Counts of Unique Classes In DataSet',
xaxis = XAxis(title='Principal components'),
yaxis = YAxis(title='Explained variance ratio')
)
fig_pca.show()
%%time
# re-convert to a numpy matrix (dont ask me why this is neccesary but i probably messed something up)
images_dim = []
for img in images_sample['image'].values:
images_dim.append(img)
images_dim = np.array(images_dim)
pca = PCA(n_components=4000)
pca_fit = pca.fit_transform(images_dim)
plot_explained_variance(pca)
pca_fit.shape
pca_fit
final_image_set = pd.DataFrame(data=[
images_sample['class'].values,
pca_fit
]).T
final_image_set.columns = ['class', 'image']
final_image_set
To summarize, our final DataSet consists of 6000 instances split evenly accross 6 classes such that there are 1000 instances per class. We have performed dimensionality reduction on the feature data and reduced the feature size of an image from 22,500 to 4,000. This is the DataSet that we will use for our train-test split.
To evaluate our algorithms generalization performance we have decided to go with the Macro F1 score metric. The F1-score is a measure of the harmonic mean of both precision and recall for a data set where the best possible value is 1 and the worst being 0. Precision is calculated by taking the amount of true positives and dividing by the sum of the true positives and false negatives. $$Precision = \frac{True Positives}{True Positives +False Positives}$$
While recall is calculated by taking the amount of true positives and dividing it by the sum of the true positives and false negatives. $$Recall = \frac{True Positives}{True Positives +False Negatives}$$
Precision and recall by themselves will not give a very good measure of how well a classifier is performing an example being if all the values are mainly false negatives and true negatives, using precision and recall would not give a very good understanding of how the classifier performed. Therefore providing the combination of the two will give us a much better understanding of how the classifier is performing. An F1-score for an individual class is calculated by multiplying the precision and recall then dividing it by the sum of precision and recall and finally multiplying it by 2. $$F1 = 2*\frac{Precision*Recall}{Precision+Recall}$$
The above F1-score calculation is for only one class, in order to get macro F1-score which is our chosen metric, the sum of all the F1-scores calculated per class is taken and then divided by the amount of classes used N. $$ Macro F1 = \frac{\sum_{i=1}^{n}F1-score_i}{N}$$
For our business case, where we would need to classify stock images when searching, we need to minimize the amount of false positives and false negatives. The reason for wanting to have a larger emphasizes of minimizing false positives and false negatives is because a user would be confused and annoyed to see an image that is not in the category that they were looking for or if they can’t find the type of image. We believe that Macro F1 is the best measure because it takes into higher consideration the misclassification of true labels which what we are aiming to minimize in our business case. We prefer this method over log loss because in log loss the metric takes into account only the probabilities calculated which causes it to be affected with imbalanced data. Log loss therefore does not give us an insight into the amount of false positives in the set if the dataset used is imbalanced while Macro F1 does. We decided against the area under the curve metric because it is primarily used in binary classification and our business case calls for using multiple classes.
Sources used:
We chose to use stratified shuffle split to split our data into a train set, test set, and validation set. The reason we chose to use stratified shuffle split is because our dataset is quite small. Using SSS we are able to maintain an equal number classes per dataset split. This is further clarified below but essentially means the counts of images in each class (in let's say the training dataset) are the same. This will help the neural network to train more evenly across the different classes which in turn should improve our metric for performance and prevent class unbalance.
After the first split, the training dataset contains 80% of the 6000 instances or 4800 (as seen below) and the test and validation datasets make up the other 20% of the 6000 instances or 1200.
After the second split, the test and validation datasets were split from the 20% in half as seen below. This resulted in the test and validation datasets having 600 instances each or 10% of our 6000 instance dataset.
Overall, we end up with 3 datasets that contain equal numbers of images per class accross the individual dataset. The sizes for those datasets can be seen below.
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.metrics import accuracy_score, recall_score
# get train and test split
split_train = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
for train_index, test_index in split_train.split(final_image_set, final_image_set['class']):
train_set = final_image_set.loc[train_index].reset_index(drop=True)
test_valid_set = final_image_set.loc[test_index].reset_index(drop=True)
# get test and validation split
split_test = StratifiedShuffleSplit(n_splits=1, test_size=0.5, random_state=65)
for train_index, test_index in split_test.split(test_valid_set, test_valid_set['class']):
test_set = test_valid_set.loc[train_index].reset_index(drop=True)
validation_set = test_valid_set.loc[test_index].reset_index(drop=True)
print("Shape of Train Set: {}".format(train_set.shape))
print("Shape of Test Set: {}".format(test_set.shape))
print("Shape of Validation Set: {}".format(validation_set.shape))
train_set
test_set
validation_set
#dump data to pickle files
# dump to pickle
train_set.to_pickle("./data/train.pkl")
test_set.to_pickle("./data/test.pkl")
validation_set.to_pickle("./data/validation.pkl")
# read pickle to pandas
train_set = pd.read_pickle("./data/train.pkl")
test_set = pd.read_pickle("./data/test.pkl")
validation_set = pd.read_pickle("./data/validation.pkl")
2 channel image is good for PCA, 3 confusing
We will now classify the data using a Multilayer Perception. Along with the classification of the data, we will tune the MLP to find the optimal hyper-parameters. Once the optimal parameters are chosen based on our Macro F1 Score metric, we will compare our custom MLP class to Sklearn's MLP class and note the differences.
# Example adapted from https://github.com/rasbt/python-machine-learning-book/blob/master/code/ch12/ch12.ipynb
# Original Author: Sebastian Raschka
# This is the optional book we use in the course, excellent intuitions and straightforward programming examples
# please note, however, that this code has been manipulated to reflect our assumptions and notation.
import numpy as np
from scipy.special import expit
import sys
import pandas as pd
# start with a simple base classifier, which can't be fit or predicted
# it only has internal classes to be used by classes that will subclass it
class TwoLayerPerceptronBase(object):
def __init__(self, n_hidden=30,
C=0.0, epochs=500, eta=0.001, random_state=None):
np.random.seed(random_state)
self.n_hidden = n_hidden
self.l2_C = C
self.epochs = epochs
self.eta = eta
@staticmethod
def _encode_labels(y):
"""Encode labels into one-hot representation"""
onehot = pd.get_dummies(y).values.T
return onehot
def _initialize_weights(self):
"""Initialize weights with small random numbers."""
W1_num_elems = (self.n_features_ + 1)*self.n_hidden
W1 = np.random.uniform(-1.0, 1.0, size=W1_num_elems)
W1 = W1.reshape(self.n_hidden, self.n_features_ + 1) # reshape to be W
W2_num_elems = (self.n_hidden + 1)*self.n_output_
W2 = np.random.uniform(-1.0, 1.0, size=W2_num_elems)
W2 = W2.reshape(self.n_output_, self.n_hidden + 1)
return W1, W2
@staticmethod
def _sigmoid(z):
"""Use scipy.special.expit to avoid overflow"""
# 1.0 / (1.0 + np.exp(-z))
return expit(z)
@staticmethod
def _add_bias_unit(X, how='column'):
"""Add bias unit (column or row of 1s) to array at index 0"""
if how == 'column':
ones = np.ones((X.shape[0], 1))
X_new = np.hstack((ones, X))
elif how == 'row':
ones = np.ones((1, X.shape[1]))
X_new = np.vstack((ones, X))
return X_new
@staticmethod
def _L2_reg(lambda_, W1, W2):
"""Compute L2-regularization cost"""
# only compute for non-bias terms
return (lambda_/2.0) * np.sqrt(np.mean(W1[:, 1:] ** 2) + np.mean(W2[:, 1:] ** 2))
def _cost(self,A3,Y_enc,W1,W2):
'''Get the objective function value'''
cost = np.mean((Y_enc-A3)**2)
L2_term = self._L2_reg(self.l2_C, W1, W2)
return cost + L2_term
class TwoLayerPerceptron(TwoLayerPerceptronBase):
def _feedforward(self, X, W1, W2):
"""Compute feedforward step
-----------
X : Input layer with original features.
W1: Weight matrix for input layer -> hidden layer.
W2: Weight matrix for hidden layer -> output layer.
----------
a1-a3 : activations into layer (or output layer)
z1-z2 : layer inputs
"""
A1 = self._add_bias_unit(X.T, how='row')
Z1 = W1 @ A1
A2 = self._sigmoid(Z1)
A2 = self._add_bias_unit(A2, how='row')
Z2 = W2 @ A2
A3 = self._sigmoid(Z2)
return A1, Z1, A2, Z2, A3
def _get_gradient(self, A1, A2, A3, Z1, Z2, Y_enc, W1, W2):
""" Compute gradient step using backpropagation.
"""
# backpropagation
grad1 = np.zeros(W1.shape)
grad2 = np.zeros(W2.shape)
# for each instance's activations
for (a1,a2,a3,y) in zip(A1.T,A2.T,A3.T,Y_enc.T):
dJ_dz2 = -2*(y - a3)*a3*(1-a3)
dJ_dz1 = dJ_dz2 @ W2 @ np.diag(a2*(1-a2))
grad2 += dJ_dz2[:,np.newaxis] @ a2[np.newaxis,:]
grad1 += dJ_dz1[1:,np.newaxis] @ a1[np.newaxis,:]
# don't incorporate bias term in the z1 gradient
# (its added in a2 from another layer)
# regularize weights that are not bias terms
grad1[:, 1:] += (W1[:, 1:] * self.l2_C)
grad2[:, 1:] += (W2[:, 1:] * self.l2_C)
return grad1, grad2
def predict(self, X):
"""Predict class labels"""
_, _, _, _, A3 = self._feedforward(X, self.W1, self.W2)
y_pred = np.argmax(A3, axis=0)
return y_pred
def fit(self, X, y, print_progress=False):
""" Learn weights from training data."""
X_data, y_data = X.copy(), y.copy()
Y_enc = self._encode_labels(y)
# init weights and setup matrices
self.n_features_ = X_data.shape[1]
self.n_output_ = Y_enc.shape[0]
self.W1, self.W2 = self._initialize_weights()
self.cost_ = []
for i in range(self.epochs):
if print_progress>0 and (i+1)%print_progress==0:
sys.stderr.write('\rEpoch: %d/%d' % (i+1, self.epochs))
sys.stderr.flush()
# feedforward all instances
A1, Z1, A2, Z2, A3 = self._feedforward(X_data,self.W1,self.W2)
cost = self._cost(A3,Y_enc,self.W1,self.W2)
self.cost_.append(cost)
# compute gradient via backpropagation
grad1, grad2 = self._get_gradient(A1=A1, A2=A2, A3=A3, Z1=Z1, Z2=Z2, Y_enc=Y_enc,
W1=self.W1, W2=self.W2)
self.W1 -= self.eta * grad1
self.W2 -= self.eta * grad2
return self
class TwoLayerPerceptronVectorized(TwoLayerPerceptron):
# just need a different gradient calculation
def _get_gradient(self, A1, A2, A3, Z1, Z2, Y_enc, W1, W2):
""" Compute gradient step using backpropagation.
"""
# vectorized backpropagation
V2 = -2*(Y_enc-A3)*A3*(1-A3)
V1 = A2*(1-A2)*(W2.T @ V2)
grad2 = V2 @ A2.T
grad1 = V1[1:,:] @ A1.T
# regularize weights that are not bias terms
grad1[:, 1:] += W1[:, 1:] * self.l2_C
grad2[:, 1:] += W2[:, 1:] * self.l2_C
return grad1, grad2
This class is largely inspired by the TwoLayerPerception class that was developed in lecture. To extend on the TwoLayerPerception class, we added a selectable activation and objective function, as well as a selectable number of layers.
class MultilayerPerceptron(TwoLayerPerceptronVectorized):
def __init__(self, nlayers=3, phi='sigmoid', cost='quadratic', **kwds):
self.nlayers = nlayers
self.phi_func = phi
self.cost_func = cost
self.alpha_ = 1.6732632423543772848170429916717
self.lambda_ = 1.0507009873554804934193349852946
super().__init__(**kwds)
@staticmethod
def _L2_reg(lambda_, weights):
"""Compute L2-regularization cost"""
# only compute for non-bias terms
sum_ = 0
for w in weights:
sum_ += np.mean(w[:, 1:] ** 2)
return (lambda_/2.0) * np.sqrt(sum_)
@staticmethod
def _linear(x):
return x
@staticmethod
def _relu(Z):
return np.maximum(0,Z.copy())
@staticmethod
def _selu(Z):
return Z * expit(Z)
def _initialize_weights(self):
_weights = []
for i in range(self.nlayers-1):
"""Initialize weights with small random numbers."""
if i == 0:
W1 = np.random.randn(self.n_hidden, self.n_features_ + 1)
W1[:,1:] = W1[:,1:]/np.sqrt(self.n_features_+1) # don't saturate the neuron
W1[:,:1] = 0 # common practice to start with zero bias
_weights.append(W1)
continue
W1 = np.random.randn(self.n_hidden, _weights[i-1].shape[0] + 1)
W1[:,1:] = W1[:,1:]/np.sqrt(_weights[i-1].shape[0]+1) # don't saturate the neuron
W1[:,:1] = 0 # common practice to start with zero bias
_weights.append(W1)
'''initialize weights for output layer'''
W2 = np.random.randn(self.n_output_, self.n_hidden + 1)
W2[:,1:] = W2[:,1:]/np.sqrt(self.n_hidden + 1) # don't saturate the neuron
W2[:,:1] = 0 # common practice to start with zero bias
_weights.append(W2)
return _weights
def _feedforward(self, X, weights):
_layers = [] # list of A matrices
'''set phi function'''
if self.phi_func == 'sigmoid':
_phi = self._sigmoid
elif self.phi_func == 'linear':
_phi = self._linear
elif self.phi_func == 'relu':
_phi = self._relu
elif self.phi_func == 'selu':
_phi = self._selu
for i in range(self.nlayers+1):
#compute first layer
if i == 0:
A1 = self._add_bias_unit(X.T, how='row')
_layers.append(A1)
continue
#compute ith layer
A_prev = _layers[i-1]
W_prev = weights[i-1]
Z_prev = W_prev @ A_prev
A_i = _phi(Z_prev)
#add bias, if layer isn't output layer
if i < self.nlayers:
A_i = self._add_bias_unit(A_i, how='row')
#use sigmoid for output layer if using relu
if self.phi_func == 'relu' and i == self.nlayers:
A_i = self._sigmoid(Z_prev)
#add layer to _layers
_layers.append(A_i)
return _layers
def fit(self, X, y, print_progress=False):
""" Learn weights from training data."""
X_data, y_data = X.copy(), y.copy()
Y_enc = self._encode_labels(y)
# init weights and setup matrices
self.n_features_ = X_data.shape[1]
self.n_output_ = Y_enc.shape[0]
self.weights = self._initialize_weights()
self.grad_avgs = [np.zeros(self.epochs) for _ in range(len(self.weights))]
self.cost_ = []
for i in range(self.epochs):
if print_progress>0 and (i+1)%print_progress==0:
sys.stderr.write('\rEpoch: %d/%d' % (i+1, self.epochs))
sys.stderr.flush()
# feedforward all instances
layers = self._feedforward(X_data, self.weights)
cost = self._cost(layers[self.nlayers],Y_enc, self.weights)
self.cost_.append(cost)
# compute gradient via backpropagation
grads = self._get_gradient(layers=layers, Y_enc=Y_enc, weights=self.weights)
for k in range(len(grads)):
self.grad_avgs[k][i] = np.mean(grads[k])
#update weights and save avg gradients
for j in range(len(self.weights)):
self.weights[j] -= self.eta * grads[j]
return self
def predict(self, X):
"""Predict class labels"""
layers= self._feedforward(X, self.weights)
y_pred = np.argmax(layers[self.nlayers], axis=0)
return y_pred
def _cost(self, Af, Y_enc, weights):
'''Get the objective function value'''
if self.cost_func == 'quadratic':
cost = np.mean((Y_enc-Af)**2)
L2_term = self._L2_reg(self.l2_C, weights)
elif self.cost_func == 'cross_entropy':
cost = -np.mean(np.nan_to_num((Y_enc*np.log(Af)+(1-Y_enc)*np.log(1-Af))))
L2_term = self._L2_reg(self.l2_C, weights)
return cost + L2_term
def _get_gradient(self, layers, Y_enc, weights):
""" Compute gradient step using backpropagation for each layer
"""
_V = []
_grads = []
'''calculate sensitivity at final layer'''
#quadratic cost
if self.cost_func == 'quadratic':
Vf = -2 * (Y_enc-layers[self.nlayers])*layers[self.nlayers]*(1-layers[self.nlayers])
#cross entropy cost
elif self.cost_func == 'cross_entropy':
Vf = (layers[self.nlayers] - Y_enc)
#calculate gradient for wrt last layer
grad_f = Vf @ layers[self.nlayers-1].T
#regularize gradient that are not bias
grad_f[:,1:] += weights[self.nlayers-1][:, 1:] * self.l2_C
_V.append(Vf)
_grads.append(grad_f)
'''backpropagate through L-1 layers to calculate sensitivities'''
for i in range(self.nlayers-2,-1,-1):
A_i = layers[i+1] #A_i+1
W_i = weights[i+1] #W_i+1
V_i2 = _V[0] #uses sensitivity of layer (i+1)
if self.phi_func == 'relu':
V_i = (W_i.T[1:,:] @ V_i2)
V_i[A_i[1:,:]<=0] = 0
else:
#calculate sensitivity for layer i
V_i = A_i[1:,:]*(1-A_i[1:,:])*(W_i.T[1:,:] @ V_i2)
#insert sensitivity for layer i at front of list
_V.insert(0,V_i)
#calculate gradient for layer i
grad_i = V_i @ layers[i].T
#regularize weights that are not bias terms
grad_i += weights[i]
_grads.insert(0,grad_i)
return _grads
def to_matrix(df):#convert to numpy matrix
images_dim = []
for img in df['image'].values:
images_dim.append(img)
return np.array(images_dim)
#encode categorical variables
encodings = { 'class': {'buildings':0, 'forest':1, 'glacier':2, 'mountain':3, 'sea':4, 'street':5}}
train_set.replace(encodings, inplace=True)
validation_set.replace(encodings, inplace=True)
test_set.replace(encodings, inplace=True)
#training data
X = to_matrix(train_set)
y = train_set['class'].values
print("X test shape: {}".format(X.shape))
print("y test shape: {}".format(y.shape))
#validation data
X_test = to_matrix(validation_set)
y_test = validation_set['class'].values
print("X test shape: {}".format(X_test.shape))
print("y test shape: {}".format(y_test.shape))
base_params = dict(
C=0.1, # tradeoff L2 regularizer
eta=0.001, # learning rate
random_state=1,
)
#parameter tunable options
phi_list = ['sigmoid','linear']
cost_list = ['quadratic','cross_entropy']
epochs_list = [50,100, 200]
n_layers_list = [2,3,4,5]
n_neurons_list = [30,50,80]
#create parameter dicts for every combination of params
param_list = []
for epoch in epochs_list:
for phi in phi_list:
for cost in cost_list:
for n_layer in n_layers_list:
for n_neurons in n_neurons_list:
params = base_params.copy()
params['phi'] = phi
params['cost'] = cost
params['epochs'] = epoch
params['nlayers'] = n_layer
params['n_hidden'] = n_neurons
param_list.append(params)
print(len(param_list))
#evaluate model on each param combination
cost_param = []
for params in param_list:
mlp = MultilayerPerceptron(**params)
mlp.fit(X, y)
yhat = mlp.predict(X_test)
cost = f1_score(y_test, yhat, average='macro')
cost_param.append((cost, params))
print("Activation: {} | Objective Function: {} | N_layers: {} | N_hidden: {} | Epochs: {} | F1_Score: {}".format(\
params['phi'],\
params['cost'],\
params['nlayers'],\
params['n_hidden'],\
params['epochs'],\
cost
))
cost_param.sort(key=lambda x: x[0], reverse=True)
top_ten = []
for i,key in enumerate(cost_param[:10]):
top_ten.append(key)
print("{}. Activation: {} | Objective Function: {} | N_layers: {} | N_hidden: {} | Epochs: {} | F1_Score: {}".format(\
i+1,
key[1]['phi'],\
key[1]['cost'],\
key[1]['nlayers'],\
key[1]['n_hidden'],\
key[1]['epochs'],\
key[0]
))
def look_up_params(activation_, epoch_, layers_, param_list):
for param in param_list:
if (param[1]['phi'] == activation_) and (param[1]['epochs'] == epoch_) \
and (param[1]['nlayers'] == layers_):
return param
#preparing data for plotting
#create sigmoids list
sigmoids = []
for model in cost_param:
if model[1]['phi'] == 'sigmoid':
sigmoids.append((model[0], model[1]['epochs'], model[1]['nlayers']))
sigmoids.sort(key=lambda x: x[0], reverse=True)
#create linear list
linears = []
for model in cost_param:
if model[1]['phi'] == 'linear':
linears.append((model[0], model[1]['epochs'], model[1]['nlayers']))
linears.sort(key=lambda x: x[0], reverse=True)
#create quadratic list
quadratics = []
for model in cost_param:
if model[1]['cost'] == 'quadratic':
quadratics.append((model[0], model[1]['epochs'], model[1]['nlayers']))
quadratics.sort(key=lambda x: x[0], reverse=True)
#create cross_entropy list
cross_entropys = []
for model in cost_param:
if model[1]['cost'] == 'cross_entropy':
cross_entropys.append((model[0], model[1]['epochs'], model[1]['nlayers']))
cross_entropys.sort(key=lambda x: x[0], reverse=True)
# add some jitter for overlapping points: https://stackoverflow.com/a/21276920
def rand_jitter(arr):
stdev = .01*(max(arr)-min(arr))
return arr + np.random.randn(len(arr)) * stdev
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
'''
X = epochs
Y = n_layers
Z = f1 score
red = sigmoid
green = linear
'''
types = ['Sigmoid', 'Linear']
colors = ['red', 'green']
scatter_dict = {0: {'x': [], 'y': [], 'z': []},
1: {'x': [], 'y': [], 'z': []}}
fig = plt.figure(figsize=(10,10))
ax = fig.add_subplot(111, projection='3d')
for sig in sigmoids:
scatter_dict[0]['x'].append(sig[1])
scatter_dict[0]['y'].append(sig[2])
scatter_dict[0]['z'].append(sig[0])
for lin in linears:
scatter_dict[1]['x'].append(lin[1])
scatter_dict[1]['y'].append(lin[2])
scatter_dict[1]['z'].append(lin[0])
for i in range(len(colors)):
ax.scatter(rand_jitter(scatter_dict[i]['x']),
rand_jitter(scatter_dict[i]['y']),
rand_jitter(scatter_dict[i]['z']),
c=colors[i], marker='o', label=types[i], s=40, alpha=0.7)
plt.title("F1 Score of Sigmoid vs Linear", fontsize=20, y=1.08)
ax.set_xlabel('Epochs', fontsize=16, labelpad=20)
ax.set_ylabel('Number of Layers', fontsize=16, labelpad=20)
ax.set_zlabel('F1 Score', fontsize=16, labelpad=20)
ax.legend(loc="upper right", fontsize=14)
plt.show()
As seen from the plot, the model using the sigmoid activation function perfomed better than the models using the linear activation function. Almost all of the linear F1 scores are near zero, indicating that the linear activation function does a very poor job at mapping the input data to the classes.
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
'''
X = epochs
Y = n_layers
Z = f1 score
red = quadratic
blue = cross_entropy
'''
types = ['quadratic', 'cross_entropy']
colors = ['red', 'blue']
scatter_dict = {0: {'x': [], 'y': [], 'z': []},
1: {'x': [], 'y': [], 'z': []}}
fig = plt.figure(figsize=(10,10))
ax = fig.add_subplot(111, projection='3d')
for q in quadratics:
scatter_dict[0]['x'].append(q[1])
scatter_dict[0]['y'].append(q[2])
scatter_dict[0]['z'].append(q[0])
for c in cross_entropys:
scatter_dict[1]['x'].append(c[1])
scatter_dict[1]['y'].append(c[2])
scatter_dict[1]['z'].append(c[0])
for i in range(len(colors)):
ax.scatter(rand_jitter(scatter_dict[i]['x']),
rand_jitter(scatter_dict[i]['y']),
rand_jitter(scatter_dict[i]['z']),
c=colors[i], marker='o', label=types[i], s=40, alpha=0.7)
plt.title("F1 Score of Quadratic vs Cross_Entropy", fontsize=20, y=1.08)
ax.set_xlabel('Epochs', fontsize=16, labelpad=20)
ax.set_ylabel('Number of Layers', fontsize=16, labelpad=20)
ax.set_zlabel('F1 Score', fontsize=16, labelpad=20)
ax.legend(loc="upper right", fontsize=14)
plt.show()
As seen from the plot, the models using cross entropy for the objective function outperformed the models using quadratic objective functions. This could be due to cross entropy's ability to map probabilities in a better fashion.
#test data
X_final = to_matrix(test_set)
y_final = test_set['class'].values
print("X test shape: {}".format(X_final.shape))
print("y test shape: {}".format(y_final.shape))
params = dict(n_hidden=80,
C=0.1, # tradeoff L2 regularizer
epochs=200, # iterations
eta= 0.001, # learning rate
random_state=1,
nlayers=3,
cost='cross_entropy',
)
mlp_relu = MultilayerPerceptron(phi='relu',**params)
mlp_silu = MultilayerPerceptron(phi='selu',**params)
mlp_sig = MultilayerPerceptron(phi='sigmoid',**params)
mlp_lin = MultilayerPerceptron(phi='linear',**params)
#fit
mlp_relu.fit(X, y, print_progress=True)
mlp_silu.fit(X, y, print_progress=True)
mlp_sig.fit(X, y, print_progress=True)
mlp_lin.fit(X, y, print_progress=True)
#evaluate on test set
yhat_relu = mlp_relu.predict(X_final)
yhat_silu = mlp_silu.predict(X_final)
yhat_sig = mlp_sig.predict(X_final)
yhat_lin = mlp_lin.predict(X_final)
print("F1 Score Relu:", f1_score(y_final,yhat_relu, average='macro'))
print("F1 Score Silu:", f1_score(y_final,yhat_silu, average='macro'))
print("F1 Score Sigmoid:", f1_score(y_final,yhat_sig, average='macro'))
print("F1 Score Linear:", f1_score(y_final,yhat_lin, average='macro'))
The F1 Scores for most of the models here seem to have failed to conferge. The sigmoid is the only model that has a semi-worthwhile score.
fig, axs = plt.subplots(2, 2)
#plot gradients of relu model
for i,grad in enumerate(mlp_relu.grad_avgs):
w = 'w{}'.format(i)
axs[0,0].plot(abs(grad[10:]), label=w)
axs[0,0].legend()
#plot gradients of silu model
for i,grad in enumerate(mlp_relu.grad_avgs):
w = 'w{}'.format(i)
axs[0,1].plot(abs(grad[10:]), label=w)
axs[0,1].legend()
#plot gradients of sigmoid model
for i,grad in enumerate(mlp_relu.grad_avgs):
w = 'w{}'.format(i)
axs[1,0].plot(abs(grad[10:]), label=w)
axs[1,0].legend()
#plot gradients of linear model
for i,grad in enumerate(mlp_relu.grad_avgs):
w = 'w{}'.format(i)
axs[1,1].plot(abs(grad[10:]), label=w)
axs[1,1].legend()
plt.ylabel('Avg Gradient Magnitude')
plt.xlabel('Epoch')
axs[0,0].set_title("Relu")
axs[0,1].set_title("Silu")
axs[1,0].set_title("Sigmoid")
axs[1,1].set_title("Linear")
plt.show()
The plots of the gradients can agree with the F1 scores that we saw before. It seems that neither of the models converged.
ax = plt.subplot(1,1,1)
for i,grad in enumerate(mlp.grad_avgs):
w = 'w{}'.format(i)
plt.plot(abs(grad[10:]), label=w)
plt.legend()
plt.ylabel('Average gradient magnitude for Sigmoid')
plt.xlabel('Iteration')
plt.show()
from sklearn.neural_network import MLPClassifier
params_custom = dict(n_hidden=80,
C=0.1, # tradeoff L2 regularizer
epochs=200, # iterations
eta= 0.001, # learning rate
random_state=1,
nlayers=3,
cost='cross_entropy',
phi='sigmoid'
)
sk_mlp = MLPClassifier(hidden_layer_sizes=(50,),
learning_rate_init=0.01,
max_iter=100,
random_state=1,
activation='logistic'
)
custom_mlp = MultilayerPerceptron(**params_custom)
sk_mlp.fit(X, y)
custom_mlp.fit(X,y)
yhat_sk = sk_mlp.predict(X_final)
yhat_custom = custom_mlp.predict(X_final)
print("Sklearn MLP F1 Score:", f1_score(y_test,yhat_sk, average='macro'))
print("Custom MLP F1 Score:", f1_score(y_test,yhat_custom, average='macro'))
When comparing Sklearn's model to our custom model, there are very similar. However, as shown above, neither model converged. This failure of convergence is likely due to the large size of the feature space for our data set. Since our data set after PCA still has images of a flattened size 4000 components, it is very unlikely that our model would converge.